package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;

/**
 * Thread Pool that executes the submitted procedures.
 * The executor has a ProcedureStore associated.
 * Each operation is logged and on restart the pending procedures are resumed.
 *
 * Unless the Procedure code throws an error (e.g. invalid user input)
 * the procedure will complete (at some point in time), On restart the pending
 * procedures are resumed and the once failed will be rolledback.
 *
 * The user can add procedures to the executor via submitProcedure(proc)
 * check for the finished state via isFinished(procId)
 * and get the result via getResult(procId)
 */
@InterfaceAudience.Private
public class ProcedureExecutor<TEnvironment> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class);

    public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set";
    private static final boolean DEFAULT_CHECK_OWNER_SET = false;

    public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = "hbase.procedure.worker.keep.alive.time.msec";
    private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1);

    public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl";
    static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min

    public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl";
    static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min

    /**
     * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to
     * break PE having it fail at various junctures. When non-null, testing is set to an instance of
     * the below internal {@link Testing} class with flags set for the particular test.
     */
    volatile Testing testing = null;

    /**
     * Class with parameters describing how to fail/die when in testing-context.
     */
    public static class Testing {
        protected volatile boolean killIfHasParent = true;
        protected volatile boolean killIfSuspended = false;

        /**
         * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is
         * persisting all the state it needs to recover after a crash.
         */
        protected volatile boolean killBeforeStoreUpdate = false;
        protected volatile boolean toggleKillBeforeStoreUpdate = false;

        /**
         * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978
         * is about a case where memory-state was being set after store to WAL where a crash could
         * cause us to get stuck. This flag allows killing at what was a vulnerable time.
         */
        protected volatile boolean killAfterStoreUpdate = false;
        protected volatile boolean toggleKillAfterStoreUpdate = false;

        protected boolean shouldKillBeforeStoreUpdate() {
            final boolean kill = this.killBeforeStoreUpdate;
            if(this.toggleKillBeforeStoreUpdate) {
                this.killBeforeStoreUpdate = !kill;
                LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate);
            }
            return kill;
        }

        protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) {
            if(isSuspended && !killIfSuspended) {
                return false;
            }
            if(hasParent && !killIfHasParent) {
                return false;
            }
            return shouldKillBeforeStoreUpdate();
        }

        protected boolean shouldKillAfterStoreUpdate() {
            final boolean kill = this.killAfterStoreUpdate;
            if(this.toggleKillAfterStoreUpdate) {
                this.killAfterStoreUpdate = !kill;
                LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate);
            }
            return kill;
        }

        protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) {
            return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate();
        }
    }

    public interface ProcedureExecutorListener {
        void procedureLoaded(long procId);

        void procedureAdded(long procId);

        void procedureFinished(long procId);
    }

    /**
     * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure.
     * Once a Root-Procedure completes (success or failure), the result will be added to this map.
     * The user of ProcedureExecutor should call getResult(procId) to get the result.
     */
    private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = new ConcurrentHashMap<>();

    /**
     * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState.
     * The RootProcedureState contains the execution stack of the Root-Procedure,
     * It is added to the map by submitProcedure() and removed on procedure completion.
     */
    private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = new ConcurrentHashMap<>();

    /**
     * Helper map to lookup the live procedures by ID.
     * This map contains every procedure. root-procedures and subprocedures.
     */
    private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = new ConcurrentHashMap<>();

    /**
     * Helper map to lookup whether the procedure already issued from the same client. This map
     * contains every root procedure.
     */
    private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>();

    private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = new CopyOnWriteArrayList<>();

    private Configuration conf;

    /**
     * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing
     * resource handling rather than observing in a #join is unexpected).
     * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
     * (Should be ok).
     */
    private ThreadGroup threadGroup;

    /**
     * Created in the {@link #init(int, boolean)}  method. Terminated in {@link #join()} (FIX! Doing
     * resource handling rather than observing in a #join is unexpected).
     * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
     * (Should be ok).
     */
    private CopyOnWriteArrayList<WorkerThread> workerThreads;

    /**
     * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing
     * resource handling rather than observing in a #join is unexpected).
     * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery
     * (Should be ok).
     */
    private TimeoutExecutorThread<TEnvironment> timeoutExecutor;

    /**
     * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if
     * there is no worker to assign meta, it will new worker thread for it, so it is very important.
     * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore
     * and so on, some tasks may execute for a long time so will block other tasks like
     * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor.
     */
    private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

    private int corePoolSize;
    private int maxPoolSize;

    private volatile long keepAliveTime;

    /**
     * Scheduler/Queue that contains runnable procedures.
     */
    private final ProcedureScheduler scheduler;

    private final Executor forceUpdateExecutor = Executors
            .newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());

    private final AtomicLong lastProcId = new AtomicLong(-1);
    private final AtomicLong workerId = new AtomicLong(0);
    private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final TEnvironment environment;
    private final ProcedureStore store;

    private final boolean checkOwnerSet;

    // To prevent concurrent execution of the same procedure.
    // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
    // procedure is woken up before we finish the suspend which causes the same procedures to be
    // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
    // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
    // execution of the same procedure.
    private final IdLock procExecutionLock = new IdLock();

    public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store) {
        this(conf, environment, store, new SimpleProcedureScheduler());
    }

    private boolean isRootFinished(Procedure<?> proc) {
        Procedure<?> rootProc = procedures.get(proc.getRootProcId());
        return rootProc == null || rootProc.isFinished();
    }

    private void forceUpdateProcedure(long procId) throws IOException {
        IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId);
        try {
            Procedure<TEnvironment> proc = procedures.get(procId);
            if(proc != null) {
                if(proc.isFinished() && proc.hasParent() && isRootFinished(proc)) {
                    LOG.debug("Procedure {} has already been finished and parent is succeeded," + " skip force updating", proc);
                    return;
                }
            } else {
                CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
                if(retainer == null || retainer.getProcedure() instanceof FailedProcedure) {
                    LOG.debug("No pending procedure with id = {}, skip force updating.", procId);
                    return;
                }
                long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL);
                long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL);
                if(retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) {
                    LOG.debug("Procedure {} has already been finished and expired, skip force updating", procId);
                    return;
                }
                proc = retainer.getProcedure();
            }
            LOG.debug("Force update procedure {}", proc);
            store.update(proc);
        } finally {
            procExecutionLock.releaseLockEntry(lockEntry);
        }
    }

    public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store, final ProcedureScheduler scheduler) {
        this.environment = environment;
        this.scheduler = scheduler;

         // store = WALProcedureStore
        this.store = store;
        this.conf = conf;
        this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
        refreshConfiguration(conf);
        store.registerListener(new ProcedureStoreListener() {

            @Override
            public void forceUpdate(long[] procIds) {
                Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
                    try {
                        forceUpdateProcedure(procId);
                    } catch(IOException e) {
                        LOG.warn("Failed to force update procedure with pid={}", procId);
                    }
                }));
            }
        });
    }

    private void load(final boolean abortOnCorruption) throws IOException {
        Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
        Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty");
        Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty");
        Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");

        store.load(new ProcedureStore.ProcedureLoader() {
            @Override
            public void setMaxProcId(long maxProcId) {
                assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()";
                lastProcId.set(maxProcId);
            }

            @Override
            public void load(ProcedureIterator procIter) throws IOException {
                loadProcedures(procIter, abortOnCorruption);
            }

            @Override
            public void handleCorrupted(ProcedureIterator procIter) throws IOException {
                int corruptedCount = 0;
                while(procIter.hasNext()) {
                    Procedure<?> proc = procIter.next();
                    LOG.error("Corrupt " + proc);
                    corruptedCount++;
                }
                if(abortOnCorruption && corruptedCount > 0) {
                    throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay");
                }
            }
        });
    }

    private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) {
        proc.restoreLock(getEnvironment());
        restored.add(proc.getProcId());
    }

    private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) {
        while(!stack.isEmpty()) {
            restoreLock(stack.pop(), restored);
        }
    }

    // Restore the locks for all the procedures.
    // Notice that we need to restore the locks starting from the root proc, otherwise there will be
    // problem that a sub procedure may hold the exclusive lock first and then we are stuck when
    // calling the acquireLock method for the parent procedure.
    // The algorithm is straight-forward:
    // 1. Use a set to record the procedures which locks have already been restored.
    // 2. Use a stack to store the hierarchy of the procedures
    // 3. For all the procedure, we will first try to find its parent and push it into the stack,
    // unless
    // a. We have no parent, i.e, we are the root procedure
    // b. The lock has already been restored(by checking the set introduced in #1)
    // then we start to pop the stack and call acquireLock for each procedure.
    // Notice that this should be done for all procedures, not only the ones in runnableList.
    private void restoreLocks() {
        Set<Long> restored = new HashSet<>();
        Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>();
        procedures.values().forEach(proc -> {
            for(; ; ) {
                if(restored.contains(proc.getProcId())) {
                    restoreLocks(stack, restored);
                    return;
                }
                if(!proc.hasParent()) {
                    restoreLock(proc, restored);
                    restoreLocks(stack, restored);
                    return;
                }
                stack.push(proc);
                proc = procedures.get(proc.getParentProcId());
            }
        });
    }

    private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) throws IOException {
        // 1. Build the rollback stack
        int runnableCount = 0;
        int failedCount = 0;
        int waitingCount = 0;
        int waitingTimeoutCount = 0;
        while(procIter.hasNext()) {
            boolean finished = procIter.isNextFinished();
            @SuppressWarnings("unchecked") Procedure<TEnvironment> proc = procIter.next();
            NonceKey nonceKey = proc.getNonceKey();
            long procId = proc.getProcId();

            if(finished) {
                completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc));
                LOG.debug("Completed {}", proc);
            } else {
                if(!proc.hasParent()) {
                    assert !proc.isFinished() : "unexpected finished procedure";
                    rollbackStack.put(proc.getProcId(), new RootProcedureState<>());
                }

                // add the procedure to the map
                proc.beforeReplay(getEnvironment());
                procedures.put(proc.getProcId(), proc);
                switch(proc.getState()) {
                    case RUNNABLE:
                        runnableCount++;
                        break;
                    case FAILED:
                        failedCount++;
                        break;
                    case WAITING:
                        waitingCount++;
                        break;
                    case WAITING_TIMEOUT:
                        waitingTimeoutCount++;
                        break;
                    default:
                        break;
                }
            }

            if(nonceKey != null) {
                nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
            }
        }

        // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
        // push it into the ProcedureScheduler directly to execute the rollback. But this does not work
        // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
        // the queue from runQueue in scheduler, and then when a procedure which has lock access, for
        // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
        // we will add the queue back to let the workers poll from it. The assumption here is that, the
        // procedure which has the xlock should have been polled out already, so when loading we can not
        // add the procedure to scheduler first and then call acquireLock, since the procedure is still
        // in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
        // then there is a dead lock
        List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
        List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
        List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
        List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount);
        procIter.reset();
        while(procIter.hasNext()) {
            if(procIter.isNextFinished()) {
                procIter.skipNext();
                continue;
            }

            @SuppressWarnings("unchecked") Procedure<TEnvironment> proc = procIter.next();
            assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
            LOG.debug("Loading {}", proc);
            Long rootProcId = getRootProcedureId(proc);
            // The orphan procedures will be passed to handleCorrupted, so add an assert here
            assert rootProcId != null;

            if(proc.hasParent()) {
                Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId());
                if(parent != null && !proc.isFinished()) {
                    parent.incChildrenLatch();
                }
            }

            RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
            procStack.loadStack(proc);

            proc.setRootProcId(rootProcId);
            switch(proc.getState()) {
                case RUNNABLE:
                    runnableList.add(proc);
                    break;
                case WAITING:
                    waitingList.add(proc);
                    break;
                case WAITING_TIMEOUT:
                    waitingTimeoutList.add(proc);
                    break;
                case FAILED:
                    failedList.add(proc);
                    break;
                case ROLLEDBACK:
                case INITIALIZING:
                    String msg = "Unexpected " + proc.getState() + " state for " + proc;
                    LOG.error(msg);
                    throw new UnsupportedOperationException(msg);
                default:
                    break;
            }
        }

        // 3. Check the waiting procedures to see if some of them can be added to runnable.
        waitingList.forEach(proc -> {
            if(!proc.hasChildren()) {
                // Normally, WAITING procedures should be waken by its children. But, there is a case that,
                // all the children are successful and before they can wake up their parent procedure, the
                // master was killed. So, during recovering the procedures from ProcedureWal, its children
                // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
                // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
                // exception will throw:
                // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
                // "NOT RUNNABLE! " + procedure.toString());
                proc.setState(ProcedureState.RUNNABLE);
                runnableList.add(proc);
            } else {
                proc.afterReplay(getEnvironment());
            }
        });
        // 4. restore locks
        restoreLocks();

        // 5. Push the procedures to the timeout executor
        waitingTimeoutList.forEach(proc -> {
            proc.afterReplay(getEnvironment());
            timeoutExecutor.add(proc);
        });

        // 6. Push the procedure to the scheduler
        failedList.forEach(scheduler::addBack);
        runnableList.forEach(p -> {
            p.afterReplay(getEnvironment());
            if(!p.hasParent()) {
                sendProcedureLoadedNotification(p.getProcId());
            }
            scheduler.addBack(p);
        });
        // After all procedures put into the queue, signal the worker threads.
        // Otherwise, there is a race condition. See HBASE-21364.
        scheduler.signalAll();
    }

    /**
     * Initialize the procedure executor, but do not start workers. We will start them later.
     * <p/>
     * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and
     * ensure a single executor, and start the procedure replay to resume and recover the previous
     * pending and in-progress procedures.
     *
     * @param numThreads        number of threads available for procedure execution.
     * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure is found on replay. otherwise false.
     */
    public void init(int numThreads, boolean abortOnCorruption) throws IOException {
        // We have numThreads executor + one timer thread used for timing out procedures and triggering periodic procedures.
        this.corePoolSize = numThreads;
        this.maxPoolSize = 10 * numThreads;
        LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", corePoolSize, maxPoolSize);

        this.threadGroup = new ThreadGroup("PEWorkerGroup");
        this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
        this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

        // Create the workers
        workerId.set(0);
        workerThreads = new CopyOnWriteArrayList<>();

        for(int i = 0; i < corePoolSize; ++i) {
            workerThreads.add(new WorkerThread(threadGroup)); // TODO 核心代码
        }

        long st, et;

        // Acquire the store lease.
        st = System.nanoTime();
        store.recoverLease();
        et = System.nanoTime();
        LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));

        // start the procedure scheduler
        scheduler.start();

        st = System.nanoTime();
        load(abortOnCorruption);
        et = System.nanoTime();
        LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st)));
    }

    /**
     * Start the workers.
     */
    public void startWorkers() throws IOException {
        if(!running.compareAndSet(false, true)) {
            LOG.warn("Already running");
            return;
        }
        // Start the executors. Here we must have the lastProcId set.
        LOG.trace("Start workers {}", workerThreads.size());
        timeoutExecutor.start();
        workerMonitorExecutor.start();

        for(WorkerThread worker : workerThreads) {
            worker.start(); // TODO 核心代码
        }

        // Internal chores
        workerMonitorExecutor.add(new WorkerMonitor());

        // Add completed cleaner chore
        addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, nonceKeysToProcIdsMap));
    }

    public void stop() {
        if(!running.getAndSet(false)) {
            return;
        }

        LOG.info("Stopping");
        scheduler.stop();
        timeoutExecutor.sendStopSignal();
        workerMonitorExecutor.sendStopSignal();
    }

    @VisibleForTesting
    public void join() {
        assert !isRunning() : "expected not running";

        // stop the timeout executor
        timeoutExecutor.awaitTermination();
        // stop the work monitor executor
        workerMonitorExecutor.awaitTermination();

        // stop the worker threads
        for(WorkerThread worker : workerThreads) {
            worker.awaitTermination();
        }

        // Destroy the Thread Group for the executors
        // TODO: Fix. #join is not place to destroy resources.
        try {
            threadGroup.destroy();
        } catch(IllegalThreadStateException e) {
            LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, e.getMessage());
            // This dumps list of threads on STDOUT.
            this.threadGroup.list();
        }

        // reset the in-memory state for testing
        completed.clear();
        rollbackStack.clear();
        procedures.clear();
        nonceKeysToProcIdsMap.clear();
        scheduler.clear();
        lastProcId.set(-1);
    }

    public void refreshConfiguration(final Configuration conf) {
        this.conf = conf;
        setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
    }

    // ==========================================================================
    //  Accessors
    // ==========================================================================
    public boolean isRunning() {
        return running.get();
    }

    /**
     * @return the current number of worker threads.
     */
    public int getWorkerThreadCount() {
        return workerThreads.size();
    }

    /**
     * @return the core pool size settings.
     */
    public int getCorePoolSize() {
        return corePoolSize;
    }

    public int getActiveExecutorCount() {
        return activeExecutorCount.get();
    }

    public TEnvironment getEnvironment() {
        return this.environment;
    }

    public ProcedureStore getStore() {
        return this.store;
    }

    ProcedureScheduler getScheduler() {
        return scheduler;
    }

    public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) {
        this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
        this.scheduler.signalAll();
    }

    public long getKeepAliveTime(final TimeUnit timeUnit) {
        return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
    }

    // ==========================================================================
    //  Submit/Remove Chores
    // ==========================================================================

    /**
     * Add a chore procedure to the executor
     *
     * @param chore the chore to add
     */
    public void addChore(ProcedureInMemoryChore<TEnvironment> chore) {
        chore.setState(ProcedureState.WAITING_TIMEOUT);
        timeoutExecutor.add(chore);
    }

    /**
     * Remove a chore procedure from the executor
     *
     * @param chore the chore to remove
     * @return whether the chore is removed, or it will be removed later
     */
    public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) {
        chore.setState(ProcedureState.SUCCESS);
        return timeoutExecutor.remove(chore);
    }

    // ==========================================================================
    //  Nonce Procedure helpers
    // ==========================================================================

    /**
     * Create a NonceKey from the specified nonceGroup and nonce.
     *
     * @param nonceGroup the group to use for the {@link NonceKey}
     * @param nonce      the nonce to use in the {@link NonceKey}
     * @return the generated NonceKey
     */
    public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
        return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce);
    }

    /**
     * Register a nonce for a procedure that is going to be submitted.
     * A procId will be reserved and on submitProcedure(),
     * the procedure with the specified nonce will take the reserved ProcId.
     * If someone already reserved the nonce, this method will return the procId reserved,
     * otherwise an invalid procId will be returned. and the caller should procede
     * and submit the procedure.
     *
     * @param nonceKey A unique identifier for this operation from the client or process.
     * @return the procId associated with the nonce, if any otherwise an invalid procId.
     */
    public long registerNonce(final NonceKey nonceKey) {
        if(nonceKey == null) {
            return -1;
        }

        // check if we have already a Reserved ID for the nonce
        Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
        if(oldProcId == null) {
            // reserve a new Procedure ID, this will be associated with the nonce
            // and the procedure submitted with the specified nonce will use this ID.
            final long newProcId = nextProcId();
            oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
            if(oldProcId == null) {
                return -1;
            }
        }

        // we found a registered nonce, but the procedure may not have been submitted yet.
        // since the client expect the procedure to be submitted, spin here until it is.
        final boolean traceEnabled = LOG.isTraceEnabled();
        while(isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && nonceKeysToProcIdsMap
                .containsKey(nonceKey)) {
            if(traceEnabled) {
                LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted");
            }
            Threads.sleep(100);
        }
        return oldProcId.longValue();
    }

    /**
     * Remove the NonceKey if the procedure was not submitted to the executor.
     *
     * @param nonceKey A unique identifier for this operation from the client or process.
     */
    public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
        if(nonceKey == null) {
            return;
        }

        final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
        if(procId == null) {
            return;
        }

        // if the procedure was not submitted, remove the nonce
        if(!(procedures.containsKey(procId) || completed.containsKey(procId))) {
            nonceKeysToProcIdsMap.remove(nonceKey);
        }
    }

    /**
     * If the failure failed before submitting it, we may want to give back the
     * same error to the requests with the same nonceKey.
     *
     * @param nonceKey  A unique identifier for this operation from the client or process
     * @param procName  name of the procedure, used to inform the user
     * @param procOwner name of the owner of the procedure, used to inform the user
     * @param exception the failure to report to the user
     */
    public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, IOException exception) {
        if(nonceKey == null) {
            return;
        }

        Long procId = nonceKeysToProcIdsMap.get(nonceKey);
        if(procId == null || completed.containsKey(procId)) {
            return;
        }

        Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception);

        completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc));
    }

    // ==========================================================================
    //  Submit/Abort Procedure
    // ==========================================================================

    /**
     * Add a new root-procedure to the executor.
     *
     * @param proc the new procedure to execute.
     * @return the procedure id, that can be used to monitor the operation
     */
    public long submitProcedure(Procedure<TEnvironment> proc) {

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        return submitProcedure(proc, null);
    }

    /**
     * Bypass a procedure. If the procedure is set to bypass, all the logic in
     * execute/rollback will be ignored and it will return success, whatever.
     * It is used to recover buggy stuck procedures, releasing the lock resources
     * and letting other procedures run. Bypassing one procedure (and its ancestors will
     * be bypassed automatically) may leave the cluster in a middle state, e.g. region
     * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures,
     * the operators may have to do some clean up on hdfs or schedule some assign procedures
     * to let region online. DO AT YOUR OWN RISK.
     * <p>
     * A procedure can be bypassed only if
     * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT
     * or it is a root procedure without any child.
     * 2. No other worker thread is executing it
     * 3. No child procedure has been submitted
     *
     * <p>
     * If all the requirements are meet, the procedure and its ancestors will be
     * bypassed and persisted to WAL.
     *
     * <p>
     * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue.
     * TODO: What about WAITING_TIMEOUT?
     *
     * @param pids      the procedure id
     * @param lockWait  time to wait lock
     * @param force     if force set to true, we will bypass the procedure even if it is executing.
     *                  This is for procedures which can't break out during executing(due to bug, mostly)
     *                  In this case, bypassing the procedure is not enough, since it is already stuck
     *                  there. We need to restart the master after bypassing, and letting the problematic
     *                  procedure to execute wth bypass=true, so in that condition, the procedure can be
     *                  successfully bypassed.
     * @param recursive We will do an expensive search for children of each pid. EXPENSIVE!
     * @return true if bypass success
     * @throws IOException IOException
     */
    public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, boolean recursive) throws IOException {
        List<Boolean> result = new ArrayList<Boolean>(pids.size());
        for(long pid : pids) {
            result.add(bypassProcedure(pid, lockWait, force, recursive));
        }
        return result;
    }

    boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) throws IOException {
        Preconditions.checkArgument(lockWait > 0, "lockWait should be positive");
        final Procedure<TEnvironment> procedure = getProcedure(pid);
        if(procedure == null) {
            LOG.debug("Procedure pid={} does not exist, skipping bypass", pid);
            return false;
        }

        LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait, override, recursive);
        IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait);
        if(lockEntry == null && !override) {
            LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait, procedure, override);
            return false;
        } else if(lockEntry == null) {
            LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait, procedure, override);
        }
        try {
            // check whether the procedure is already finished
            if(procedure.isFinished()) {
                LOG.debug("{} is already finished, skipping bypass", procedure);
                return false;
            }

            if(procedure.hasChildren()) {
                if(recursive) {
                    // EXPENSIVE. Checks each live procedure of which there could be many!!!
                    // Is there another way to get children of a procedure?
                    LOG.info("Recursive bypass on children of pid={}", procedure.getProcId());
                    this.procedures.forEachValue(1 /*Single-threaded*/,
                            // Transformer
                            v -> v.getParentProcId() == procedure.getProcId() ? v : null,
                            // Consumer
                            v -> {
                                try {
                                    bypassProcedure(v.getProcId(), lockWait, override, recursive);
                                } catch(IOException e) {
                                    LOG.warn("Recursive bypass of pid={}", v.getProcId(), e);
                                }
                            });
                } else {
                    LOG.debug("{} has children, skipping bypass", procedure);
                    return false;
                }
            }

            // If the procedure has no parent or no child, we are safe to bypass it in whatever state
            if(procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE && procedure.getState() != ProcedureState.WAITING && procedure
                    .getState() != ProcedureState.WAITING_TIMEOUT) {
                LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " + "(with no parent), {}", procedure);
                // Question: how is the bypass done here?
                return false;
            }

            // Now, the procedure is not finished, and no one can execute it since we take the lock now
            // And we can be sure that its ancestor is not running too, since their child has not
            // finished yet
            Procedure<TEnvironment> current = procedure;
            while(current != null) {
                LOG.debug("Bypassing {}", current);
                current.bypass(getEnvironment());
                store.update(procedure);
                long parentID = current.getParentProcId();
                current = getProcedure(parentID);
            }

            //wake up waiting procedure, already checked there is no child
            if(procedure.getState() == ProcedureState.WAITING) {
                procedure.setState(ProcedureState.RUNNABLE);
                store.update(procedure);
            }

            // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler.
            // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE
            if(procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
                LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure);
                if(timeoutExecutor.remove(procedure)) {
                    LOG.debug("removed procedure {} from timeoutExecutor", procedure);
                    timeoutExecutor.executeTimedoutProcedure(procedure);
                }
            } else if(lockEntry != null) {
                scheduler.addFront(procedure);
                LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure);
            } else {
                // If we don't have the lock, we can't re-submit the queue,
                // since it is already executing. To get rid of the stuck situation, we
                // need to restart the master. With the procedure set to bypass, the procedureExecutor
                // will bypass it and won't get stuck again.
                LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " + "skipping add to queue", procedure);
            }
            return true;

        } finally {
            if(lockEntry != null) {
                procExecutionLock.releaseLockEntry(lockEntry);
            }
        }
    }

    /**
     * Add a new root-procedure to the executor.
     *
     * @param proc     the new procedure to execute.
     * @param nonceKey the registered unique identifier for this operation from the client or process.
     * @return the procedure id, that can be used to monitor the operation
     */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", justification = "FindBugs is blind to the check-for-null")
    public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) {
        Preconditions.checkArgument(lastProcId.get() >= 0);

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： Prepare
         */
        prepareProcedure(proc);

        final Long currentProcId;
        if(nonceKey != null) {
            currentProcId = nonceKeysToProcIdsMap.get(nonceKey);
            String errorMsg = "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc;
            Preconditions.checkArgument(currentProcId != null, errorMsg);
        } else {
            currentProcId = nextProcId();
        }

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： Initialize
         */
        // Initialize the procedure
        proc.setNonceKey(nonceKey);
        proc.setProcId(currentProcId.longValue());

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： Commit
         *   store = WALProcedureStore
         */
        // Commit the transaction
        store.insert(proc, null);
        LOG.debug("Stored {}", proc);

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： push
         */
        // Add the procedure to the executor
        return pushProcedure(proc);
    }

    /**
     * Add a set of new root-procedure to the executor.
     *
     * @param procs the new procedures to execute.
     */
    // TODO: Do we need to take nonces here?
    public void submitProcedures(Procedure<TEnvironment>[] procs) {
        Preconditions.checkArgument(lastProcId.get() >= 0);
        if(procs == null || procs.length <= 0) {
            return;
        }

        // Prepare procedure
        for(int i = 0; i < procs.length; ++i) {
            prepareProcedure(procs[i]).setProcId(nextProcId());
        }

        // Commit the transaction
        store.insert(procs);
        if(LOG.isDebugEnabled()) {
            LOG.debug("Stored " + Arrays.toString(procs));
        }

        // Add the procedure to the executor
        for(int i = 0; i < procs.length; ++i) {
            pushProcedure(procs[i]);
        }
    }

    private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) {
        Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING);
        Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
        if(this.checkOwnerSet) {
            Preconditions.checkArgument(proc.hasOwner(), "missing owner");
        }
        return proc;
    }

    private long pushProcedure(Procedure<TEnvironment> proc) {
        final long currentProcId = proc.getProcId();

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： 初始化
         */
        // Update metrics on start of a procedure
        proc.updateMetricsOnSubmit(getEnvironment());

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： 回滚
         */
        // Create the rollback stack for the procedure
        RootProcedureState<TEnvironment> stack = new RootProcedureState<>();
        rollbackStack.put(currentProcId, stack);

        // Submit the new subprocedures
        assert !procedures.containsKey(currentProcId);
        procedures.put(currentProcId, proc);

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释：
         */
        sendProcedureAddedNotification(currentProcId);

        // TODO_MA 注释：
        scheduler.addBack(proc);

        return proc.getProcId();
    }

    /**
     * Send an abort notification the specified procedure.
     * Depending on the procedure implementation the abort can be considered or ignored.
     *
     * @param procId the procedure to abort
     * @return true if the procedure exists and has received the abort, otherwise false.
     */
    public boolean abort(long procId) {
        return abort(procId, true);
    }

    /**
     * Send an abort notification to the specified procedure.
     * Depending on the procedure implementation, the abort can be considered or ignored.
     *
     * @param procId                the procedure to abort
     * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted?
     * @return true if the procedure exists and has received the abort, otherwise false.
     */
    public boolean abort(long procId, boolean mayInterruptIfRunning) {
        Procedure<TEnvironment> proc = procedures.get(procId);
        if(proc != null) {
            if(!mayInterruptIfRunning && proc.wasExecuted()) {
                return false;
            }
            return proc.abort(getEnvironment());
        }
        return false;
    }

    // ==========================================================================
    //  Executor query helpers
    // ==========================================================================
    public Procedure<TEnvironment> getProcedure(final long procId) {
        return procedures.get(procId);
    }

    public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) {
        Procedure<TEnvironment> proc = getProcedure(procId);
        if(clazz.isInstance(proc)) {
            return clazz.cast(proc);
        }
        return null;
    }

    public Procedure<TEnvironment> getResult(long procId) {
        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
        if(retainer == null) {
            return null;
        } else {
            return retainer.getProcedure();
        }
    }

    /**
     * Return true if the procedure is finished.
     * The state may be "completed successfully" or "failed and rolledback".
     * Use getResult() to check the state or get the result data.
     *
     * @param procId the ID of the procedure to check
     * @return true if the procedure execution is finished, otherwise false.
     */
    public boolean isFinished(final long procId) {
        return !procedures.containsKey(procId);
    }

    /**
     * Return true if the procedure is started.
     *
     * @param procId the ID of the procedure to check
     * @return true if the procedure execution is started, otherwise false.
     */
    public boolean isStarted(long procId) {
        Procedure<?> proc = procedures.get(procId);
        if(proc == null) {
            return completed.get(procId) != null;
        }
        return proc.wasExecuted();
    }

    /**
     * Mark the specified completed procedure, as ready to remove.
     *
     * @param procId the ID of the procedure to remove
     */
    public void removeResult(long procId) {
        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
        if(retainer == null) {
            assert !procedures.containsKey(procId) : "pid=" + procId + " is still running";
            LOG.debug("pid={} already removed by the cleaner.", procId);
            return;
        }

        // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired.
        retainer.setClientAckTime(EnvironmentEdgeManager.currentTime());
    }

    public Procedure<TEnvironment> getResultOrProcedure(long procId) {
        CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
        if(retainer == null) {
            return procedures.get(procId);
        } else {
            return retainer.getProcedure();
        }
    }

    /**
     * Check if the user is this procedure's owner
     *
     * @param procId the target procedure
     * @param user   the user
     * @return true if the user is the owner of the procedure,
     * false otherwise or the owner is unknown.
     */
    public boolean isProcedureOwner(long procId, User user) {
        if(user == null) {
            return false;
        }
        final Procedure<TEnvironment> runningProc = procedures.get(procId);
        if(runningProc != null) {
            return runningProc.getOwner().equals(user.getShortName());
        }

        final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId);
        if(retainer != null) {
            return retainer.getProcedure().getOwner().equals(user.getShortName());
        }

        // Procedure either does not exist or has already completed and got cleaned up.
        // At this time, we cannot check the owner of the procedure
        return false;
    }

    /**
     * Should only be used when starting up, where the procedure workers have not been started.
     * <p/>
     * If the procedure works has been started, the return values maybe changed when you are
     * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as
     * it will do a copy, and also include the finished procedures.
     */
    public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() {
        return procedures.values();
    }

    /**
     * Get procedures.
     *
     * @return the procedures in a list
     */
    public List<Procedure<TEnvironment>> getProcedures() {
        List<Procedure<TEnvironment>> procedureList = new ArrayList<>(procedures.size() + completed.size());
        procedureList.addAll(procedures.values());
        // Note: The procedure could show up twice in the list with different state, as
        // it could complete after we walk through procedures list and insert into
        // procedureList - it is ok, as we will use the information in the Procedure
        // to figure it out; to prevent this would increase the complexity of the logic.
        completed.values().stream().map(CompletedProcedureRetainer::getProcedure).forEach(procedureList::add);
        return procedureList;
    }

    // ==========================================================================
    //  Listeners helpers
    // ==========================================================================
    public void registerListener(ProcedureExecutorListener listener) {
        this.listeners.add(listener);
    }

    public boolean unregisterListener(ProcedureExecutorListener listener) {
        return this.listeners.remove(listener);
    }

    private void sendProcedureLoadedNotification(final long procId) {
        if(!this.listeners.isEmpty()) {
            for(ProcedureExecutorListener listener : this.listeners) {
                try {

                    /********
                     * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                     *   注释：
                     */
                    listener.procedureLoaded(procId);

                } catch(Throwable e) {
                    LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
                }
            }
        }
    }

    private void sendProcedureAddedNotification(final long procId) {
        if(!this.listeners.isEmpty()) {
            for(ProcedureExecutorListener listener : this.listeners) {
                try {

                    /********
                     * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                     *   注释：
                     */
                    listener.procedureAdded(procId);
                } catch(Throwable e) {
                    LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
                }
            }
        }
    }

    private void sendProcedureFinishedNotification(final long procId) {
        if(!this.listeners.isEmpty()) {
            for(ProcedureExecutorListener listener : this.listeners) {
                try {
                    listener.procedureFinished(procId);
                } catch(Throwable e) {
                    LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e);
                }
            }
        }
    }

    // ==========================================================================
    //  Procedure IDs helpers
    // ==========================================================================
    private long nextProcId() {
        long procId = lastProcId.incrementAndGet();
        if(procId < 0) {
            while(!lastProcId.compareAndSet(procId, 0)) {
                procId = lastProcId.get();
                if(procId >= 0) {
                    break;
                }
            }
            while(procedures.containsKey(procId)) {
                procId = lastProcId.incrementAndGet();
            }
        }
        assert procId >= 0 : "Invalid procId " + procId;
        return procId;
    }

    @VisibleForTesting
    protected long getLastProcId() {
        return lastProcId.get();
    }

    @VisibleForTesting
    public Set<Long> getActiveProcIds() {
        return procedures.keySet();
    }

    Long getRootProcedureId(Procedure<TEnvironment> proc) {
        return Procedure.getRootProcedureId(procedures, proc);
    }

    // ==========================================================================
    //  Executions
    // ==========================================================================
    private void executeProcedure(Procedure<TEnvironment> proc) {
        if(proc.isFinished()) {
            return;
        }
        final Long rootProcId = getRootProcedureId(proc);
        if(rootProcId == null) {
            LOG.warn("Rollback because parent is done/rolledback proc=" + proc);
            executeRollback(proc);
            return;
        }

        RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId);
        if(procStack == null) {
            LOG.warn("RootProcedureState is null for " + proc.getProcId());
            return;
        }
        do {
            // Try to acquire the execution
            if(!procStack.acquire(proc)) {
                if(procStack.setRollback()) {
                    // we have the 'rollback-lock' we can start rollingback
                    switch(executeRollback(rootProcId, procStack)) {
                        case LOCK_ACQUIRED:
                            break;
                        case LOCK_YIELD_WAIT:
                            procStack.unsetRollback();
                            scheduler.yield(proc);
                            break;
                        case LOCK_EVENT_WAIT:
                            LOG.info("LOCK_EVENT_WAIT rollback..." + proc);
                            procStack.unsetRollback();
                            break;
                        default:
                            throw new UnsupportedOperationException();
                    }
                } else {
                    if(!proc.wasExecuted()) {
                        switch(executeRollback(proc)) {
                            case LOCK_ACQUIRED:
                                break;
                            case LOCK_YIELD_WAIT:
                                scheduler.yield(proc);
                                break;
                            case LOCK_EVENT_WAIT:
                                LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc);
                                break;
                            default:
                                throw new UnsupportedOperationException();
                        }
                    }
                }
                break;
            }

            // Execute the procedure
            assert proc.getState() == ProcedureState.RUNNABLE : proc;
            // Note that lock is NOT about concurrency but rather about ensuring
            // ownership of a procedure of an entity such as a region or table
            LockState lockState = acquireLock(proc);
            switch(lockState) {
                case LOCK_ACQUIRED:
                    execProcedure(procStack, proc); // TODO 核心代码
                    break;
                case LOCK_YIELD_WAIT:
                    LOG.info(lockState + " " + proc);
                    scheduler.yield(proc);
                    break;
                case LOCK_EVENT_WAIT:
                    // Someone will wake us up when the lock is available
                    LOG.debug(lockState + " " + proc);
                    break;
                default:
                    throw new UnsupportedOperationException();
            }

            procStack.release(proc);

            if(proc.isSuccess()) {
                // update metrics on finishing the procedure
                proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
                LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
                // Finalize the procedure state
                if(proc.getProcId() == rootProcId) {
                    procedureFinished(proc);
                } else {
                    execCompletionCleanup(proc);
                }
                break;
            }
        } while(procStack.isFailed());
    }

    private LockState acquireLock(Procedure<TEnvironment> proc) {
        TEnvironment env = getEnvironment();
        // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if
        // hasLock is true.
        if(proc.hasLock()) {
            return LockState.LOCK_ACQUIRED;
        }
        return proc.doAcquireLock(env, store);
    }

    private void releaseLock(Procedure<TEnvironment> proc, boolean force) {
        TEnvironment env = getEnvironment();
        // For how the framework works, we know that we will always have the lock
        // when we call releaseLock(), so we can avoid calling proc.hasLock()
        if(force || !proc.holdLock(env) || proc.isFinished()) {
            proc.doReleaseLock(env, store);
        }
    }

    /**
     * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the
     * root-procedure will be visible as finished to user, and the result will be the fatal exception.
     */
    private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) {
        Procedure<TEnvironment> rootProc = procedures.get(rootProcId);
        RemoteProcedureException exception = rootProc.getException();
        // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are
        // rolling back because the subprocedure does. Clarify.
        if(exception == null) {
            exception = procStack.getException();
            rootProc.setFailure(exception);
            store.update(rootProc);
        }

        List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack();
        assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc;

        int stackTail = subprocStack.size();
        while(stackTail-- > 0) {
            Procedure<TEnvironment> proc = subprocStack.get(stackTail);
            IdLock.Entry lockEntry = null;
            // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need
            // this check, as the worker will hold the lock before executing a procedure. This is the only
            // place where we may hold two procedure execution locks, and there is a fence in the
            // RootProcedureState where we can make sure that only one worker can execute the rollback of
            // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to
            // prevent race between us and the force update thread.
            if(!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) {
                try {
                    lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
                } catch(IOException e) {
                    // can only happen if interrupted, so not a big deal to propagate it
                    throw new UncheckedIOException(e);
                }
            }
            try {
                // For the sub procedures which are successfully finished, we do not rollback them.
                // Typically, if we want to rollback a procedure, we first need to rollback it, and then
                // recursively rollback its ancestors. The state changes which are done by sub procedures
                // should be handled by parent procedures when rolling back. For example, when rolling back
                // a MergeTableProcedure, we will schedule new procedures to bring the offline regions
                // online, instead of rolling back the original procedures which offlined the regions(in
                // fact these procedures can not be rolled back...).
                if(proc.isSuccess()) {
                    // Just do the cleanup work, without actually executing the rollback
                    subprocStack.remove(stackTail);
                    cleanupAfterRollbackOneStep(proc);
                    continue;
                }
                LockState lockState = acquireLock(proc);
                if(lockState != LockState.LOCK_ACQUIRED) {
                    // can't take a lock on the procedure, add the root-proc back on the
                    // queue waiting for the lock availability
                    return lockState;
                }

                lockState = executeRollback(proc);
                releaseLock(proc, false);
                boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
                abortRollback |= !isRunning() || !store.isRunning();

                // allows to kill the executor before something is stored to the wal.
                // useful to test the procedure recovery.
                if(abortRollback) {
                    return lockState;
                }

                subprocStack.remove(stackTail);

                // if the procedure is kind enough to pass the slot to someone else, yield
                // if the proc is already finished, do not yield
                if(!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) {
                    return LockState.LOCK_YIELD_WAIT;
                }

                if(proc != rootProc) {
                    execCompletionCleanup(proc);
                }
            } finally {
                if(lockEntry != null) {
                    procExecutionLock.releaseLockEntry(lockEntry);
                }
            }
        }

        // Finalize the procedure state
        LOG.info("Rolled back {} exec-time={}", rootProc, StringUtils.humanTimeDiff(rootProc.elapsedTime()));
        procedureFinished(rootProc);
        return LockState.LOCK_ACQUIRED;
    }

    private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) {
        if(proc.removeStackIndex()) {
            if(!proc.isSuccess()) {
                proc.setState(ProcedureState.ROLLEDBACK);
            }

            // update metrics on finishing the procedure (fail)
            proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false);

            if(proc.hasParent()) {
                store.delete(proc.getProcId());
                procedures.remove(proc.getProcId());
            } else {
                final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds();
                if(childProcIds != null) {
                    store.delete(proc, childProcIds);
                } else {
                    store.update(proc);
                }
            }
        } else {
            store.update(proc);
        }
    }

    /**
     * Execute the rollback of the procedure step.
     * It updates the store with the new state (stack index)
     * or will remove completly the procedure in case it is a child.
     */
    private LockState executeRollback(Procedure<TEnvironment> proc) {
        try {
            proc.doRollback(getEnvironment());
        } catch(IOException e) {
            LOG.debug("Roll back attempt failed for {}", proc, e);
            return LockState.LOCK_YIELD_WAIT;
        } catch(InterruptedException e) {
            handleInterruptedException(proc, e);
            return LockState.LOCK_YIELD_WAIT;
        } catch(Throwable e) {
            // Catch NullPointerExceptions or similar errors...
            LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e);
        }

        // allows to kill the executor before something is stored to the wal.
        // useful to test the procedure recovery.
        if(testing != null && testing.shouldKillBeforeStoreUpdate()) {
            String msg = "TESTING: Kill before store update";
            LOG.debug(msg);
            stop();
            throw new RuntimeException(msg);
        }

        cleanupAfterRollbackOneStep(proc);

        return LockState.LOCK_ACQUIRED;
    }

    private void yieldProcedure(Procedure<TEnvironment> proc) {
        releaseLock(proc, false);
        scheduler.yield(proc);
    }

    /**
     * Executes <code>procedure</code>
     * <ul>
     *  <li>Calls the doExecute() of the procedure
     *  <li>If the procedure execution didn't fail (i.e. valid user input)
     *  <ul>
     *    <li>...and returned subprocedures
     *    <ul><li>The subprocedures are initialized.
     *      <li>The subprocedures are added to the store
     *      <li>The subprocedures are added to the runnable queue
     *      <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete
     *    </ul>
     *    </li>
     *   <li>...if there are no subprocedure
     *    <ul><li>the procedure completed successfully
     *      <li>if there is a parent (WAITING)
     *      <li>the parent state will be set to RUNNABLE
     *    </ul>
     *   </li>
     *  </ul>
     *  </li>
     *  <li>In case of failure
     *  <ul>
     *    <li>The store is updated with the new state</li>
     *    <li>The executor (caller of this method) will start the rollback of the procedure</li>
     *  </ul>
     *  </li>
     *  </ul>
     */
    private void execProcedure(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) {
        Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, "NOT RUNNABLE! " + procedure.toString());

        boolean suspended = false;
        boolean reExecute = false;
        Procedure<TEnvironment>[] subprocs = null;
        do {
            reExecute = false;
            procedure.resetPersistence();
            try {
                subprocs = procedure.doExecute(getEnvironment()); // TODO 核心代码

                if(subprocs != null && subprocs.length == 0) {
                    subprocs = null;
                }
            } catch(ProcedureSuspendedException e) {
                LOG.trace("Suspend {}", procedure);
                suspended = true;
            } catch(ProcedureYieldException e) {
                LOG.trace("Yield {}", procedure, e);
                yieldProcedure(procedure);
                return;
            } catch(InterruptedException e) {
                LOG.trace("Yield interrupt {}", procedure, e);
                handleInterruptedException(procedure, e);
                yieldProcedure(procedure);
                return;
            } catch(Throwable e) {
                // Catch NullPointerExceptions or similar errors...
                String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
                LOG.error(msg, e);
                procedure.setFailure(new RemoteProcedureException(msg, e));
            }

            if(!procedure.isFailed()) {
                if(subprocs != null) {
                    if(subprocs.length == 1 && subprocs[0] == procedure) {
                        // Procedure returned itself. Quick-shortcut for a state machine-like procedure;
                        // i.e. we go around this loop again rather than go back out on the scheduler queue.
                        subprocs = null;
                        reExecute = true;
                        LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId());
                    } else {
                        // Yield the current procedure, and make the subprocedure runnable
                        // subprocs may come back 'null'.
                        subprocs = initializeChildren(procStack, procedure, subprocs);
                        LOG.info("Initialized subprocedures=" + (subprocs == null ? null : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").
                                collect(Collectors.toList()).toString()));
                    }
                } else if(procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
                    LOG.trace("Added to timeoutExecutor {}", procedure);
                    timeoutExecutor.add(procedure);
                } else if(!suspended) {
                    // No subtask, so we are done
                    procedure.setState(ProcedureState.SUCCESS);
                }
            }

            // Add the procedure to the stack
            procStack.addRollbackStep(procedure);

            // allows to kill the executor before something is stored to the wal.
            // useful to test the procedure recovery.
            if(testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) {
                kill("TESTING: Kill BEFORE store update: " + procedure);
            }

            // TODO: The code here doesn't check if store is running before persisting to the store as
            // it relies on the method call below to throw RuntimeException to wind up the stack and
            // executor thread to stop. The statement following the method call below seems to check if
            // store is not running, to prevent scheduling children procedures, re-execution or yield
            // of this procedure. This may need more scrutiny and subsequent cleanup in future
            //
            // Commit the transaction even if a suspend (state may have changed). Note this append
            // can take a bunch of time to complete.
            if(procedure.needPersistence()) {
                updateStoreOnExec(procStack, procedure, subprocs);
            }

            // if the store is not running we are aborting
            if(!store.isRunning()) {
                return;
            }
            // if the procedure is kind enough to pass the slot to someone else, yield
            if(procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) {
                yieldProcedure(procedure);
                return;
            }

            assert (reExecute && subprocs == null) || !reExecute;
        } while(reExecute);

        // Allows to kill the executor after something is stored to the WAL but before the below
        // state settings are done -- in particular the one on the end where we make parent
        // RUNNABLE again when its children are done; see countDownChildren.
        if(testing != null && testing.shouldKillAfterStoreUpdate(suspended)) {
            kill("TESTING: Kill AFTER store update: " + procedure);
        }

        // Submit the new subprocedures
        if(subprocs != null && !procedure.isFailed()) {
            submitChildrenProcedures(subprocs);
        }

        // we need to log the release lock operation before waking up the parent procedure, as there
        // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all
        // the sub procedures from store and cause problems...
        releaseLock(procedure, false);

        // if the procedure is complete and has a parent, count down the children latch.
        // If 'suspended', do nothing to change state -- let other threads handle unsuspend event.
        if(!suspended && procedure.isFinished() && procedure.hasParent()) {
            countDownChildren(procStack, procedure);
        }
    }

    private void kill(String msg) {
        LOG.debug(msg);
        stop();
        throw new RuntimeException(msg);
    }

    private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure,
            Procedure<TEnvironment>[] subprocs) {
        assert subprocs != null : "expected subprocedures";
        final long rootProcId = getRootProcedureId(procedure);
        for(int i = 0; i < subprocs.length; ++i) {
            Procedure<TEnvironment> subproc = subprocs[i];
            if(subproc == null) {
                String msg = "subproc[" + i + "] is null, aborting the procedure";
                procedure.setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg)));
                return null;
            }

            assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
            subproc.setParentProcId(procedure.getProcId());
            subproc.setRootProcId(rootProcId);
            subproc.setProcId(nextProcId());
            procStack.addSubProcedure(subproc);
        }

        if(!procedure.isFailed()) {
            procedure.setChildrenLatch(subprocs.length);
            switch(procedure.getState()) {
                case RUNNABLE:
                    procedure.setState(ProcedureState.WAITING);
                    break;
                case WAITING_TIMEOUT:
                    timeoutExecutor.add(procedure);
                    break;
                default:
                    break;
            }
        }
        return subprocs;
    }

    private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) {
        for(int i = 0; i < subprocs.length; ++i) {
            Procedure<TEnvironment> subproc = subprocs[i];
            subproc.updateMetricsOnSubmit(getEnvironment());
            assert !procedures.containsKey(subproc.getProcId());
            procedures.put(subproc.getProcId(), subproc);
            scheduler.addFront(subproc);
        }
    }

    private void countDownChildren(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure) {
        Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId());
        if(parent == null) {
            assert procStack.isRollingback();
            return;
        }

        // If this procedure is the last child awake the parent procedure
        if(parent.tryRunnable()) {
            // If we succeeded in making the parent runnable -- i.e. all of its
            // children have completed, move parent to front of the queue.
            store.update(parent);
            scheduler.addFront(parent);
            LOG.info("Finished subprocedure pid={}, resume processing parent {}", procedure.getProcId(), parent);
            return;
        }
    }

    private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, Procedure<TEnvironment> procedure,
            Procedure<TEnvironment>[] subprocs) {
        if(subprocs != null && !procedure.isFailed()) {
            if(LOG.isTraceEnabled()) {
                LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs));
            }
            store.insert(procedure, subprocs);
        } else {
            LOG.trace("Store update {}", procedure);
            if(procedure.isFinished() && !procedure.hasParent()) {
                // remove child procedures
                final long[] childProcIds = procStack.getSubprocedureIds();
                if(childProcIds != null) {
                    store.delete(procedure, childProcIds);
                    for(int i = 0; i < childProcIds.length; ++i) {
                        procedures.remove(childProcIds[i]);
                    }
                } else {
                    store.update(procedure);
                }
            } else {
                store.update(procedure);
            }
        }
    }

    private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) {
        LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e);
        // NOTE: We don't call Thread.currentThread().interrupt()
        // because otherwise all the subsequent calls e.g. Thread.sleep() will throw
        // the InterruptedException. If the master is going down, we will be notified
        // and the executor/store will be stopped.
        // (The interrupted procedure will be retried on the next run)
    }

    private void execCompletionCleanup(Procedure<TEnvironment> proc) {
        final TEnvironment env = getEnvironment();
        if(proc.hasLock()) {
            LOG.warn(
                    "Usually this should not happen, we will release the lock before if the procedure" + " is finished, even if the holdLock is true, arrive here means we have some holes where" + " we do not release the lock. And the releaseLock below may fail since the procedure may" + " have already been deleted from the procedure store.");
            releaseLock(proc, true);
        }
        try {
            proc.completionCleanup(env);
        } catch(Throwable e) {
            // Catch NullPointerExceptions or similar errors...
            LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e);
        }
    }

    private void procedureFinished(Procedure<TEnvironment> proc) {
        // call the procedure completion cleanup handler
        execCompletionCleanup(proc);

        CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc);

        // update the executor internal state maps
        if(!proc.shouldWaitClientAck(getEnvironment())) {
            retainer.setClientAckTime(0);
        }

        completed.put(proc.getProcId(), retainer);
        rollbackStack.remove(proc.getProcId());
        procedures.remove(proc.getProcId());

        // call the runnableSet completion cleanup handler
        try {
            scheduler.completionCleanup(proc);
        } catch(Throwable e) {
            // Catch NullPointerExceptions or similar errors...
            LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e);
        }

        // Notify the listeners
        sendProcedureFinishedNotification(proc.getProcId());
    }

    RootProcedureState<TEnvironment> getProcStack(long rootProcId) {
        return rollbackStack.get(rootProcId);
    }

    @VisibleForTesting
    ProcedureScheduler getProcedureScheduler() {
        return scheduler;
    }

    @VisibleForTesting
    int getCompletedSize() {
        return completed.size();
    }

    @VisibleForTesting
    public IdLock getProcExecutionLock() {
        return procExecutionLock;
    }

    private class WorkerThread extends StoppableThread {
        private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
        private volatile Procedure<TEnvironment> activeProcedure;

        public WorkerThread(ThreadGroup group) {
            this(group, "PEWorker-");
        }

        protected WorkerThread(ThreadGroup group, String prefix) {
            super(group, prefix + workerId.incrementAndGet());
            setDaemon(true);
        }

        @Override
        public void sendStopSignal() {
            scheduler.signalAll();
        }

        @Override
        public void run() {
            long lastUpdate = EnvironmentEdgeManager.currentTime();
            try {
                while(isRunning() && keepAlive(lastUpdate)) {
                    Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); // TODO 核心代码
                    if(proc == null) {
                        continue;
                    }
                    this.activeProcedure = proc;
                    int activeCount = activeExecutorCount.incrementAndGet();
                    int runningCount = store.setRunningProcedureCount(activeCount);
                    LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, activeCount);
                    executionStartTime.set(EnvironmentEdgeManager.currentTime());
                    IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
                    try {
                        executeProcedure(proc); // TODO 核心代码
                    } catch(AssertionError e) {
                        LOG.info("ASSERT pid=" + proc.getProcId(), e);
                        throw e;
                    } finally {
                        procExecutionLock.releaseLockEntry(lockEntry);
                        activeCount = activeExecutorCount.decrementAndGet();
                        runningCount = store.setRunningProcedureCount(activeCount);
                        LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, activeCount);
                        this.activeProcedure = null;
                        lastUpdate = EnvironmentEdgeManager.currentTime();
                        executionStartTime.set(Long.MAX_VALUE);
                    }
                }
            } catch(Throwable t) {
                LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
            } finally {
                LOG.trace("Worker terminated.");
            }
            workerThreads.remove(this);
        }

        @Override
        public String toString() {
            Procedure<?> p = this.activeProcedure;
            return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")");
        }

        /**
         * @return the time since the current procedure is running
         */
        public long getCurrentRunTime() {
            return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
        }

        // core worker never timeout
        protected boolean keepAlive(long lastUpdate) {
            return true;
        }
    }

    // A worker thread which can be added when core workers are stuck. Will timeout after
    // keepAliveTime if there is no procedure to run.
    private final class KeepAliveWorkerThread extends WorkerThread {
        public KeepAliveWorkerThread(ThreadGroup group) {
            super(group, "KeepAlivePEWorker-");
        }

        @Override
        protected boolean keepAlive(long lastUpdate) {
            return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
        }
    }

    // ----------------------------------------------------------------------------
    // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
    // full set of procedures pending and completed to write a compacted
    // version of the log (in case is a log)?
    // In theory no, procedures are have a short life, so at some point the store
    // will have the tracker saying everything is in the last log.
    // ----------------------------------------------------------------------------

    private final class WorkerMonitor extends InlineChore {
        public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = "hbase.procedure.worker.monitor.interval.msec";
        private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec

        public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = "hbase.procedure.worker.stuck.threshold.msec";
        private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec

        public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = "hbase.procedure.worker.add.stuck.percentage";
        private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck

        private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
        private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
        private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;

        public WorkerMonitor() {
            refreshConfig();
        }

        @Override
        public void run() {
            final int stuckCount = checkForStuckWorkers();
            checkThreadCount(stuckCount);

            // refresh interval (poor man dynamic conf update)
            refreshConfig();
        }

        private int checkForStuckWorkers() {
            // check if any of the worker is stuck
            int stuckCount = 0;
            for(WorkerThread worker : workerThreads) {
                if(worker.getCurrentRunTime() < stuckThreshold) {
                    continue;
                }

                // WARN the worker is stuck
                stuckCount++;
                LOG.warn("Worker stuck {}, run time {}", worker, StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
            }
            return stuckCount;
        }

        private void checkThreadCount(final int stuckCount) {
            // nothing to do if there are no runnable tasks
            if(stuckCount < 1 || !scheduler.hasRunnables()) {
                return;
            }

            // add a new thread if the worker stuck percentage exceed the threshold limit
            // and every handler is active.
            final float stuckPerc = ((float) stuckCount) / workerThreads.size();
            // let's add new worker thread more aggressively, as they will timeout finally if there is no
            // work to do.
            if(stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) {
                final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
                workerThreads.add(worker);
                worker.start();
                LOG.debug("Added new worker thread {}", worker);
            }
        }

        private void refreshConfig() {
            addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
            timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL);
            stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD);
        }

        @Override
        public int getTimeoutInterval() {
            return timeoutInterval;
        }
    }
}
